package net.chengp.esload4j.source.mysql;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.util.StringUtils;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventData;

import lombok.extern.slf4j.Slf4j;
import net.chengp.esload4j.config.ESLoad4jConfig.DBS;
import net.chengp.esload4j.config.ESLoad4jConfig.MysqlSourceConfig;
import net.chengp.esload4j.db.kv.MapDBUtils;
import net.chengp.esload4j.source.mysql.db.MySQLDaoImpl;
import net.chengp.esload4j.source.mysql.handle.MySQLEventHandlerContext;
import net.chengp.esload4j.target.es.ESLoad;

/**
 * 
 * @author chengp
 * @email E-mail:chengp@chengp.net
 * @version 创建时间：2020年9月12日 下午3:24:06 
 *
 */
@Slf4j
public class MySQLToESHandleThread implements Runnable {

	private MysqlSourceConfig mysqlSourceConfig;

	private ESLoad ESLoad;

	private static ExecutorService threads = null;

	private boolean initFlag = true;

	public MySQLToESHandleThread(MysqlSourceConfig mysqlSourceConfig, ESLoad ESLoad) {
		this.mysqlSourceConfig = mysqlSourceConfig;
		this.ESLoad = ESLoad;
		threads = Executors.newFixedThreadPool(mysqlSourceConfig.getSynDataThreadCount());
	}

	@Override
	public void run() {
		try {
			BinaryLogClient client = new BinaryLogClient(mysqlSourceConfig.getHost(), mysqlSourceConfig.getPort(),
					mysqlSourceConfig.getUsername(), mysqlSourceConfig.getPassword());
			String binlogStatus = MapDBUtils.get(mysqlSourceConfig.getId());
			if (!StringUtils.isEmpty(binlogStatus)) {
				client.setBinlogFilename(binlogStatus.split(",")[0]);
				client.setBinlogPosition(Long.valueOf(binlogStatus.split(",")[1]));
			}
			initESIndex();
			if (!initFlag) {
				System.exit(0);
			}
			client.registerEventListener(event -> {
				EventData data = event.getData();
				if (data == null) {
					return;
				}
				MapDBUtils.put(mysqlSourceConfig.getId(),
						client.getBinlogFilename() + "," + client.getBinlogPosition());
				new MySQLToESHandleService(mysqlSourceConfig, ESLoad).handle(data);
			});
			client.connect();
		} catch (Exception e) {
			log.error("同步MySQL数据至ES失败！");
		}
	}

	private void initESIndex() {
		List<DBS> dbs = mysqlSourceConfig.getDbs();
		if (dbs != null && dbs.size() > 0) {
			dbs.forEach(db -> {
				if (StringUtils.isEmpty(db.getDatabase()) || StringUtils.isEmpty(db.getTables())) {
					return;
				}
				String[] tables = db.getTables().split(",");
				for (String tableName : tables) {
					String index = StringUtils.isEmpty(db.getIndex()) ? db.getDatabase() + "-" + tableName
							: db.getIndex();
					try {
						MySQLEventHandlerContext.listenTable.put(db.getDatabase() + "." + tableName, db);
						if (ESLoad.indexExists(index)) {
							continue;
						}
						log.info("开始加载数据表：" + db.getDatabase() + "." + tableName + "至ES索引：" + index);
						long dataCount = new MySQLDaoImpl(mysqlSourceConfig).count(db.getDatabase() + "." + tableName);
						log.info("数据量：" + dataCount);
						ESLoad.createIndex(index, db.getAlias());
						log.info("索引创建成功！");
						log.info("开始写入数据...");
						int batchSize = 1000;
						for (int i = 0; i <= dataCount / batchSize; i++) {
							threads.execute(new MySQLToESDataLoadThread(mysqlSourceConfig, ESLoad,
									db.getDatabase() + "." + tableName, index, i * batchSize,
									i == dataCount / batchSize ? dataCount - i * batchSize : batchSize));
						}
						while (true) {
							try {
								Thread.sleep(1000 * 2);
								if (MySQLToESDataLoadThread.count.get() >= dataCount) {
									break;
								} else if (MySQLToESDataLoadThread.count.get() < 0) {
									initFlag = false;
									break;
								} else {
									log.info("已写入数据量：" + MySQLToESDataLoadThread.count + "，总数据量：" + dataCount);
								}
							} catch (Exception e) {
								log.error("检测任务完成状态异常.", e);
							}
						}
						if (initFlag) {
							log.info("写入数据完成。");
							log.info("数据表：" + db.getDatabase() + "." + tableName + "成功加载" + dataCount + "条数据至ES索引："
									+ index);
						} else {
							log.error("数据写入失败!");
						}
					} catch (Exception e) {
						log.error("数据表：" + db.getDatabase() + "." + tableName + "加载至ES索引：" + index + "失败！", e);
						initFlag = false;
						return;
					}
				}
			});
		}
	}

}
